import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Summary:
 *      Fold: 基于初始值和自定义的FoldFunction滚动折叠后发出新值
 */
public class Fold
{
    public static void main(String[] args) throws Exception
    {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 输入: 用户行为。某个用户在某个时刻点击或浏览了某个商品，以及商品的价格。


//        下面是一些浏览记录
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", 1293984000, "click", "productID1", 10),
                new UserAction("userID2", 1293984001, "browse", "productID2", 8),
                new UserAction("userID2", 1293984002, "browse", "productID2", 8),
                new UserAction("userID2", 1293984003, "browse", "productID2", 8),
                new UserAction("userID1", 1293984002, "click", "productID1", 10),
                new UserAction("userID1", 1293984003, "click", "productID3", 10),
                new UserAction("userID1", 1293984004, "click", "productID1", 10)
        ));

        // 转换: KeyBy对数据重分区
        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction value) throws Exception {
                return value.getUserID();
            }
        });

        // 转换: Fold 基于初始值和FoldFunction滚动折叠
        SingleOutputStreamOperator<String> result = keyedStream.fold("浏览的商品及价格:", new FoldFunction<UserAction, String>() {
            @Override
            public String fold(String accumulator, UserAction value) throws Exception {
                if(accumulator.startsWith("userID")){
                    return accumulator + " -> "                        + value.getProductID()+":"+value.getProductPrice();
                }
                else {
                    return value.getUserID()+" " +accumulator + " -> " + value.getProductID()+":"+value.getProductPrice();
                }
            }
        });

        // 输出: 输出到控制台
        // 每一条数据都会触发计算并输出
        // userID1 浏览的商品及价格: -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10
        // userID1 浏览的商品及价格: -> productID1:10 -> productID1:10 -> productID3:10 -> productID1:10
        // userID2 浏览的商品及价格: -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8
        // userID2 浏览的商品及价格: -> productID2:8 -> productID2:8 -> productID2:8
        result.print();


//        实验结果是同一个客户浏览过的商品都聚集到一起,实现浏览记录汇总.

        env.execute();

    }
}